/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
 * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
 * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
 * License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 */
package com.alibaba.nacossync.extension.impl;

import java.lang.reflect.Field;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.CollectionUtils;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.web.util.UriComponentsBuilder;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.client.naming.NacosNamingService;
import com.alibaba.nacos.client.naming.core.ServerListManager;
import com.alibaba.nacos.client.naming.remote.NamingClientProxyDelegate;
import com.alibaba.nacos.common.utils.MD5Utils;
import com.alibaba.nacossync.cache.SkyWalkerCacheServices;
import com.alibaba.nacossync.constant.ClusterTypeEnum;
import com.alibaba.nacossync.constant.MetricsStatisticsType;
import com.alibaba.nacossync.constant.SkyWalkerConstants;
import com.alibaba.nacossync.extension.SyncService;
import com.alibaba.nacossync.extension.annotation.NacosSyncService;
import com.alibaba.nacossync.extension.eureka.EurekaNamingService;
import com.alibaba.nacossync.extension.event.SpecialSyncEventBus;
import com.alibaba.nacossync.extension.holder.EurekaServerHolder;
import com.alibaba.nacossync.extension.holder.NacosServerHolder;
import com.alibaba.nacossync.monitor.MetricsManager;
import com.alibaba.nacossync.pojo.model.TaskDO;
import com.alibaba.nacossync.util.HttpUtil;
import com.alibaba.nacossync.util.NacosUtil;
import com.netflix.appinfo.InstanceInfo;

import lombok.extern.slf4j.Slf4j;

/**
 * eureka
 *
 * @author paderlol
 * @author fenglibin
 * @date: 2018-12-31 16:25
 */
@Slf4j
@NacosSyncService(sourceCluster = ClusterTypeEnum.EUREKA, destinationCluster = ClusterTypeEnum.NACOS)
public class EurekaSyncToNacosServiceImpl implements SyncService {

    private final MetricsManager metricsManager;

    private final EurekaServerHolder eurekaServerHolder;
    private final SkyWalkerCacheServices skyWalkerCacheServices;

    private final NacosServerHolder nacosServerHolder;

    private final SpecialSyncEventBus specialSyncEventBus;
    
    // 保留Eureka中每个应用与提供的服务列表的MD5值，避免重复更新Nacos中的节点信息
    private final Map<String,String> serviceInstanceMd5Map = new ConcurrentHashMap<String,String>();
    
    private final Map<String,List<InstanceInfo>> lastApplications = new HashMap<String,List<InstanceInfo>>();

    @Autowired
    public EurekaSyncToNacosServiceImpl(EurekaServerHolder eurekaServerHolder,
        SkyWalkerCacheServices skyWalkerCacheServices, NacosServerHolder nacosServerHolder,
        SpecialSyncEventBus specialSyncEventBus, MetricsManager metricsManager) {
        this.eurekaServerHolder = eurekaServerHolder;
        this.skyWalkerCacheServices = skyWalkerCacheServices;
        this.nacosServerHolder = nacosServerHolder;
        this.specialSyncEventBus = specialSyncEventBus;
        this.metricsManager = metricsManager;
    }
    
    @Override
	public boolean stop(TaskDO taskDO) {
    	specialSyncEventBus.unsubscribe(taskDO);
		return true;
	}

    @Override
    public boolean delete(TaskDO taskDO) {

        try {
            specialSyncEventBus.unsubscribe(taskDO);
            EurekaNamingService eurekaNamingService = eurekaServerHolder.get(taskDO.getSourceClusterId(), taskDO.getNameSpace());
            NamingService destNamingService = nacosServerHolder.get(taskDO.getDestClusterId(), taskDO.getNameSpace());
            
            Map<String,List<InstanceInfo>> applications = new HashMap<String,List<InstanceInfo>>();
            if(SkyWalkerConstants.STAR.equals(taskDO.getServiceName())) {//获取全部
            	applications.putAll(eurekaNamingService.getAllApplications());
            }else {//获取单个或多个
            	applications.putAll(eurekaNamingService.getApplications(taskDO.getServiceName()));
            }
            applications.forEach((serviceName,eurekaInstances)->{
            	TaskDO currentTaskDO = new TaskDO();
            	BeanUtils.copyProperties(taskDO, currentTaskDO);
            	currentTaskDO.setServiceName(serviceName);
            	try {
					deleteAllInstanceFromNacos(taskDO, destNamingService, eurekaInstances);
				} catch (NacosException e) {
					log.error("delete a task from eureka to nacos was failed, taskId:{}, serviceName:{}, error:{}", taskDO.getTaskId(), serviceName, e.toString());
		            metricsManager.recordError(MetricsStatisticsType.DELETE_ERROR);
				}
            });
        } catch (Exception e) {
            log.error("delete a task from eureka to nacos was failed, taskId:{}, serviceName:{}, error:{}", taskDO.getTaskId(), taskDO.getServiceName(), e.toString());
            metricsManager.recordError(MetricsStatisticsType.DELETE_ERROR);
            return false;
        }
        return true;
    }

    @Override
    public boolean sync(TaskDO taskDO) {
        try {
        	
            EurekaNamingService eurekaNamingService = eurekaServerHolder.get(taskDO.getSourceClusterId(), taskDO.getNameSpace());
            NamingService destNamingService = nacosServerHolder.get(taskDO.getDestClusterId(), taskDO.getNameSpace());
            
            Map<String,List<InstanceInfo>> currentApplications = new HashMap<String,List<InstanceInfo>>();
            if(SkyWalkerConstants.STAR.equals(taskDO.getServiceName())) {//获取全部
            	currentApplications.putAll(eurekaNamingService.getAllApplications());
            }else {//获取单个或多个
            	currentApplications.putAll(eurekaNamingService.getApplications(taskDO.getServiceName()));
            }
            
            currentApplications.forEach((serviceName,eurekaInstances)->{
            	TaskDO currentTaskDO = new TaskDO();
            	BeanUtils.copyProperties(taskDO, currentTaskDO);
            	currentTaskDO.setServiceName(serviceName);
            	try{
            		List<Instance> nacosInstances = destNamingService.getAllInstances(currentTaskDO.getServiceName(),currentTaskDO.getGroupName());
            		if (CollectionUtils.isEmpty(eurekaInstances) && !CollectionUtils.isEmpty(nacosInstances)) {
	                    // Clear all instance from Nacos，从Nacos中删除不存在于Eureka（该服务从Eureka同步过来的）的服务
	                    deleteAllInstance(currentTaskDO, destNamingService, nacosInstances);
	                }
            		
            		if(!isNeedHandle(taskDO.getNameSpace(), serviceName, eurekaInstances)) {
            			return;
            		}
					
                    if (!CollectionUtils.isEmpty(nacosInstances)) {
                        // Remove invalid instance from Nacos
                        removeInvalidInstance(currentTaskDO, destNamingService, eurekaInstances, nacosInstances);
                    }
                    addValidInstance(currentTaskDO, destNamingService, eurekaInstances);
	                
            	}catch(Exception e) {
            		log.error("update nacos by eureka instance failed, taskId:{}, serviceName:{}, error={}", currentTaskDO.getTaskId(), currentTaskDO.getServiceName(), e.toString());
                    metricsManager.recordError(MetricsStatisticsType.SYNC_ERROR);
            	}
            });
            
            //从Nacos中删除不存在于Eureka中的服务
            deleteServicesNotExistsInEurekaFromNacos(taskDO, currentApplications, destNamingService);
            
            specialSyncEventBus.subscribe(taskDO, this::sync);
        } catch (Exception e) {
            log.error("sync task from eureka to nacos was failed, taskId:{},error:{}", taskDO.getTaskId(), e.toString());
            metricsManager.recordError(MetricsStatisticsType.SYNC_ERROR);
            return false;
        }
        return true;
    }
    
    /**
     * 往Nacos中注册Provider节点。<br>
     * @param taskDO
     * @param destNamingService
     * @param eurekaInstances
     * @throws NacosException
     * @throws NoSuchFieldException
     * @throws SecurityException
     * @throws IllegalArgumentException
     * @throws IllegalAccessException
     */
    private void addValidInstance(TaskDO taskDO, NamingService destNamingService, List<InstanceInfo> eurekaInstances)
            throws NacosException, NoSuchFieldException, SecurityException, IllegalArgumentException, IllegalAccessException {
        int idx=0;
    	for (InstanceInfo instance : eurekaInstances) {
            if (needSync(instance.getMetadata(),taskDO)) {
            	idx++;
            	log.info("Add service instance from Eureka to Nacos, serviceName={}, Ip={}, port={}, namespace={}, group={}, idx={}",
            			instance.getAppName(), instance.getIPAddr(), instance.getPort(), taskDO.getNameSpace(), taskDO.getGroupName(),idx);
            	
            	if(idx == 1) {
            		/* 
            		 * 第一个使用GRPC注册，保持长连接，后续的使用HTTP的方式注册,用于解决当前Nacos客户端进程只能够注册一个应用节点的问题
            		 * 问题描述：目前Nacos客户端在同一个应用端进程中，对相同服务的Provier只能够注册一个节点，
            		 * 如果该服务有多个节点，则最后注册的节点会替换前面注册的节点。
            		 */
	                destNamingService.registerInstance(taskDO.getServiceName(), taskDO.getGroupName(), buildSyncInstance(instance, taskDO));
            	}else {
            		addValidInstance(taskDO, destNamingService, instance);
            	}
            }
        }
    }
    /**
     * 往Nacos中注册Provider节点。<br>
     * 通过HTTP的方式往Nacos中注册节点，NacosSync不需要与Nacos建立长连接，针对同一个应用在相同Namespace下，可以创建多个Provider节点。
     * @param taskDO
     * @param destNamingService
     * @param instanceInfo
     * @throws NacosException
     * @throws NoSuchFieldException
     * @throws SecurityException
     * @throws IllegalArgumentException
     * @throws IllegalAccessException
     */
    private void addValidInstance(TaskDO taskDO, NamingService destNamingService, InstanceInfo instanceInfo)
        throws NacosException, NoSuchFieldException, SecurityException, IllegalArgumentException, IllegalAccessException {
    	
    	//通过反射获取Nacos服务的地址
    	NacosNamingService nacosNamingService = (NacosNamingService)destNamingService;
    	Field clientProxyField = nacosNamingService.getClass().getDeclaredField("clientProxy");
    	clientProxyField.setAccessible(true);
    	NamingClientProxyDelegate clientProxyDelegate = (NamingClientProxyDelegate) clientProxyField.get(nacosNamingService);
    	Field serverListManagerField = clientProxyDelegate.getClass().getDeclaredField("serverListManager");
    	serverListManagerField.setAccessible(true);
    	ServerListManager serverListManager = (ServerListManager)serverListManagerField.get(clientProxyDelegate);
    	
    	List<String> serverList = serverListManager.getServerList();
    	String nacosServerUrl = serverList.get(0)+"/nacos/v1/ns/instance";
    	if(!nacosServerUrl.startsWith("http")) {
    		nacosServerUrl = "http://"+nacosServerUrl;
    	}
    	
    	Instance instance = buildSyncInstance(instanceInfo, taskDO);
    	
    	MultiValueMap<String, String> uriVariables = new LinkedMultiValueMap<>();
    	uriVariables.add("ip", instanceInfo.getIPAddr());
    	uriVariables.add("port", instanceInfo.getPort()+"");
    	uriVariables.add("namespaceId", taskDO.getNameSpace());
    	uriVariables.add("weight", "1");
    	uriVariables.add("enabled", "true");
    	uriVariables.add("healthy", "true");
    	uriVariables.add("metadata", JSONObject.toJSONString(instance.getMetadata()));
    	uriVariables.add("clusterName", instanceInfo.getAppName());
    	uriVariables.add("serviceName", instanceInfo.getAppName());
    	uriVariables.add("groupName", taskDO.getGroupName());
    	uriVariables.add("ephemeral", "true");
    	//发起HTTP注册请求
		UriComponentsBuilder builder = UriComponentsBuilder.fromHttpUrl(nacosServerUrl).queryParams(uriVariables);
		log.info("发起Nacos注册的URL："+builder.toUriString());
		String response = HttpUtil.sendPost(builder.toUriString());
		log.info("Add service instance from Eureka to Nacos result: {}", response);

    }

    /**
     * 从Nacos中删除所有Eureka注册过去的服务Provider节点
     * @param taskDO
     * @param destNamingService
     * @param eurekaInstances
     * @throws NacosException
     */
    private void deleteAllInstanceFromNacos(TaskDO taskDO, NamingService destNamingService,
        List<InstanceInfo> eurekaInstances)
        throws NacosException {
        if (CollectionUtils.isEmpty(eurekaInstances)) {
            return;
        }
        for (InstanceInfo instance : eurekaInstances) {
            if (needSync(instance.getMetadata(),taskDO)) {
                log.info("Delete service instance from Nacos, serviceName={}, Ip={}, port={}, namespace={}, group={}",
                    instance.getAppName(), instance.getIPAddr(), instance.getPort(), taskDO.getNameSpace(), taskDO.getGroupName());
                destNamingService.deregisterInstance(taskDO.getServiceName(), taskDO.getGroupName(), buildSyncInstance(instance, taskDO));
            }
        }
    }
    /**
     * 从Nacos中删除(取消注册)已经不存在于Eureka中的Provider节点
     * @param taskDO
     * @param destNamingService
     * @param eurekaInstances
     * @param nacosInstances
     * @throws NacosException
     */
    private void removeInvalidInstance(TaskDO taskDO, NamingService destNamingService,
        List<InstanceInfo> eurekaInstances, List<Instance> nacosInstances) throws NacosException {
        for (Instance instance : nacosInstances) {
            if (!isExistInEurekaInstance(eurekaInstances, instance) && needDelete(instance.getMetadata(), taskDO)) {
                log.info("Remove invalid service instance from Nacos, serviceName={}, Ip={}, port={}, namespace={}, group={}",
                    instance.getServiceName(), instance.getIp(), instance.getPort(), taskDO.getNameSpace(), taskDO.getGroupName());
                destNamingService.deregisterInstance(taskDO.getServiceName(), taskDO.getGroupName(), instance.getIp(), instance.getPort());
            }
        }
    }

    /**
     * 判断当前服务在Nacos中的Provider节点，是否还存在于从Eureka中获取的当前服务的Provider列表
     * @param eurekaInstances
     * @param nacosInstance
     * @return 
     */
    private boolean isExistInEurekaInstance(List<InstanceInfo> eurekaInstances, Instance nacosInstance) {
    	if(CollectionUtils.isEmpty(eurekaInstances)) {
    		return false;
    	}
        return eurekaInstances.stream().anyMatch(instance -> instance.getIPAddr().equals(nacosInstance.getIp())
            && instance.getPort() == nacosInstance.getPort());
    }

    /**
     * 从Nacos中删除不存在于Eureka（该服务从Eureka同步过来的）的服务
     * @param taskDO
     * @param destNamingService
     * @param allInstances
     * @throws NacosException
     */
    private void deleteAllInstance(TaskDO taskDO, NamingService destNamingService, List<Instance> allInstances)
        throws NacosException {
    	if(CollectionUtils.isEmpty(allInstances)) {
    		return;
    	}
        for (Instance instance : allInstances) {
            if (needDelete(instance.getMetadata(), taskDO)) {
                destNamingService.deregisterInstance(taskDO.getServiceName(), taskDO.getGroupName(), instance);
            }

        }
    }

    private Instance buildSyncInstance(InstanceInfo instance, TaskDO taskDO) {
        Instance temp = new Instance();
        temp.setIp(instance.getIPAddr());
        temp.setPort(instance.getPort());
        temp.setServiceName(instance.getAppName());
        temp.setClusterName(instance.getAppName());
        temp.setHealthy(true);

        Map<String, String> metaData = new HashMap<>(instance.getMetadata());
        metaData.put(SkyWalkerConstants.DEST_CLUSTERID_KEY, taskDO.getDestClusterId());
        metaData.put(SkyWalkerConstants.SYNC_SOURCE_KEY,
            skyWalkerCacheServices.getClusterType(taskDO.getSourceClusterId()).getCode());
        metaData.put(SkyWalkerConstants.SOURCE_CLUSTERID_KEY, taskDO.getSourceClusterId());
        temp.setMetadata(metaData);
        return temp;
    }
    
    /**
     * 判断当前服务的提供者是否有改变并需要处理
     * @param namespace 命名空间
     * @param serviceName 服务名称
     * @param eurekaInstances 服务提供者列表信息
     * @return 是否需要处理
     */
    private boolean isNeedHandle(String namespace, String serviceName, List<InstanceInfo> eurekaInstances) {
    	if(CollectionUtils.isEmpty(eurekaInstances)) {
    		return false;
    	}
    	String keyName = new StringBuilder(namespace).append(SkyWalkerConstants.UNDERLINE).append(serviceName).toString();
    	String oldInstanceInfoMd5 = serviceInstanceMd5Map.get(keyName);
        String instanceInfoMd5 = "";
        StringBuilder instanceInfo = new StringBuilder(serviceName);
		try {
			eurekaInstances.forEach(instance->{
				instanceInfo.append(SkyWalkerConstants.UNDERLINE).append(instance.getIPAddr())
				.append(SkyWalkerConstants.UNDERLINE).append(instance.getPort())
				.append(SkyWalkerConstants.UNDERLINE).append(instance.getLeaseInfo().getRegistrationTimestamp());
			});
			instanceInfo.append(SkyWalkerConstants.COLON);
			instanceInfoMd5 = MD5Utils.md5Hex(instanceInfo.toString().getBytes());
		} catch (NoSuchAlgorithmException e) {
			
		}
        if(oldInstanceInfoMd5 == null || !instanceInfoMd5.contentEquals(oldInstanceInfoMd5)) {
        	serviceInstanceMd5Map.put(keyName, instanceInfoMd5);
        	return true;
        }else if(instanceInfoMd5.contentEquals(oldInstanceInfoMd5)){
        	//log.warn("The service "+serviceName+"'s two eureka response has the same content, ignore the response.");
        	return false;
        }
        return true;
    }
    
    /**
     * 从Nacos中删除不存在于Eureka中的服务
     * 
     * @param taskDO 当前定时任务对象
     * @param currentApplications 当前应用名称与对应服务实例的Map
     * @param destNamingService 目标名称服务，此处指NacosNamingService
     */
    private void deleteServicesNotExistsInEurekaFromNacos(TaskDO taskDO,Map<String,List<InstanceInfo>> currentApplications,NamingService destNamingService) {
    	//从Nacos中删除不存在于Eureka中的服务
        if(SkyWalkerConstants.STAR.equals(taskDO.getServiceName())) {
        	//　所有服务的名称列表
        	List<String> servicesNameList = new ArrayList<String>();
        	
        	if(CollectionUtils.isEmpty(lastApplications)) {//说明应该是第一次运行该任务，需要从Nacos中获取全部的服务及实例
        		//从Nacos获取所有的服务名称
        		servicesNameList.addAll(NacosUtil.getAllNacosServiceNameList(taskDO.getGroupName(), destNamingService));
        	}else {
        		servicesNameList.addAll(lastApplications.keySet());
        	}
        	servicesNameList.forEach(serviceName->{
            	List<InstanceInfo> eurekaInstances = currentApplications.get(serviceName);
            	if(eurekaInstances==null) {//说明当前存在于Nacos中的服务，已经不存在Eureka中了，需要将其从Nacos中移除
            		TaskDO currentTaskDO = new TaskDO();
                	BeanUtils.copyProperties(taskDO, currentTaskDO);
                	currentTaskDO.setServiceName(serviceName);
					try {
						List<Instance> nacosInstances = destNamingService.getAllInstances(currentTaskDO.getServiceName(),currentTaskDO.getGroupName());
						removeInvalidInstance(currentTaskDO, destNamingService, eurekaInstances, nacosInstances);
					} catch (NacosException e) {
						log.error("从Nacos中删除不存在于Eureka中的服务["+serviceName+"]发生异常:"+e.getMessage(),e);
					}
            		
            	}
            });
        }
        lastApplications.clear();
        lastApplications.putAll(currentApplications);
    }

}
